package com.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.app.function.CustomerDeserialization;
import com.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2022/1/1 20:42
 */
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO:Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
        /*//开启CK并指定状态后端为FS menory fs rocksdb
        env.setStateBackend(new FsStateBackend("hdfs://192.168.88.109:9820/gmall-flink/ck"));
        //开启 Checkpoint,每隔 5 秒钟做一次 CK
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "root");*/

        //通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("192.168.88.109")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-flink")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();


        DataStreamSource<String> streamSource = env.addSource(sourceFunction);


        //将数据写入Kafka
        String sinkTopic="ods_base_db";
        System.out.println("任务开始>>>>>>>>>>>>>>>>>>");
        streamSource.print(">>>>>>>>>>>>>>>>>>>");
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
        //启动
        env.execute("FlinkCDC");
    }
}
